// Copyright 2024 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//      http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package tikv

import (
	"context"
	"flag"
	"fmt"
	"slices"
	"testing"
	"time"

	rocks "github.com/cockroachdb/pebble"
	rockssst "github.com/cockroachdb/pebble/sstable"
	"github.com/cockroachdb/pebble/vfs"
	"github.com/google/uuid"
	"github.com/pingcap/errors"
	"github.com/pingcap/kvproto/pkg/import_sstpb"
	"github.com/pingcap/kvproto/pkg/kvrpcpb"
	"github.com/stretchr/testify/require"
	pd "github.com/tikv/pd/client"
	"github.com/tikv/pd/client/pkg/caller"
	"google.golang.org/grpc"
	"google.golang.org/grpc/credentials/insecure"
)

type testCase struct {
	sortedKVs [][2][]byte
	ts        uint64

	expectedFilePath string
}

var testCases []*testCase

func init() {
	testCases = make([]*testCase, 0, 2)

	testCases = append(testCases, &testCase{
		sortedKVs: [][2][]byte{
			{[]byte("a"), []byte("1")},
		},
		ts:               1,
		expectedFilePath: "sst-examples/0.sst",
	})

	moreKeys := make([][2][]byte, 10000)
	for i := range moreKeys {
		moreKeys[i] = [2][]byte{
			[]byte("key" + fmt.Sprintf("%09d", i)),
			[]byte("1"),
		}
	}
	testCases = append(testCases, &testCase{
		sortedKVs:        moreKeys,
		ts:               404411537129996288,
		expectedFilePath: "sst-examples/1.sst",
	})
}

// write2ImportService4Test writes these sorted key-value pairs to the TiKV
// cluster. SST files are generated by TiKV and saved in import directory if no
// error happens.
func write2ImportService4Test(
	ctx context.Context,
	pdAddrs []string,
	sortedKVs [][2][]byte,
	ts uint64,
) ([]*import_sstpb.SSTMeta, error) {
	pdClient, err := pd.NewClient(caller.TestComponent, pdAddrs, pd.SecurityOption{})
	if err != nil {
		return nil, errors.Trace(err)
	}
	defer pdClient.Close()

	r0, err := pdClient.GetRegion(ctx, sortedKVs[0][0])
	if err != nil {
		return nil, errors.Trace(err)
	}
	r1, err := pdClient.GetRegion(ctx, sortedKVs[len(sortedKVs)-1][0])
	if err != nil {
		return nil, errors.Trace(err)
	}
	if r0.Meta.Id != r1.Meta.Id {
		return nil, errors.Errorf(
			"only support write to the same region, "+
				"first key: %X, last key: %X, "+
				"first region id: %d, last region id: %d",
			sortedKVs[0][0], sortedKVs[len(sortedKVs)-1][0],
			r0.Meta.Id, r1.Meta.Id,
		)
	}

	store, err := pdClient.GetStore(ctx, r0.Leader.GetStoreId())
	if err != nil {
		return nil, errors.Trace(err)
	}
	conn, err := grpc.DialContext(
		ctx, store.GetAddress(),
		grpc.WithTransportCredentials(insecure.NewCredentials()),
		grpc.WithBlock(),
	)
	if err != nil {
		return nil, errors.Trace(err)
	}
	defer conn.Close()

	ingestClient := import_sstpb.NewImportSSTClient(conn)
	writeStream, err := ingestClient.Write(ctx)
	if err != nil {
		return nil, errors.Trace(err)
	}
	u := uuid.New()
	writeMeta := &import_sstpb.SSTMeta{
		Uuid:        u[:],
		RegionId:    r0.Meta.Id,
		RegionEpoch: r0.Meta.RegionEpoch,
		Range: &import_sstpb.Range{
			Start: sortedKVs[0][0],
			End:   sortedKVs[len(sortedKVs)-1][0],
		},
	}
	rpcCtx := kvrpcpb.Context{
		RegionId:    r0.Meta.Id,
		RegionEpoch: r0.Meta.RegionEpoch,
		Peer:        r0.Leader,
	}
	err = writeStream.Send(&import_sstpb.WriteRequest{
		Chunk:   &import_sstpb.WriteRequest_Meta{Meta: writeMeta},
		Context: &rpcCtx,
	})
	if err != nil {
		return nil, errors.Trace(err)
	}

	batch := &import_sstpb.WriteBatch{
		CommitTs: ts,
		Pairs:    make([]*import_sstpb.Pair, 0, len(sortedKVs)),
	}
	for _, kv := range sortedKVs {
		batch.Pairs = append(batch.Pairs, &import_sstpb.Pair{
			Key:   kv[0],
			Value: kv[1],
		})
	}
	err = writeStream.Send(&import_sstpb.WriteRequest{
		Chunk:   &import_sstpb.WriteRequest_Batch{Batch: batch},
		Context: &rpcCtx,
	})
	if err != nil {
		return nil, errors.Trace(err)
	}
	resp, err := writeStream.CloseAndRecv()
	if err != nil {
		return nil, errors.Trace(err)
	}
	if resp.GetError() != nil {
		return nil, errors.Errorf("write failed: %s", resp.GetError())
	}
	return resp.Metas, nil
}

var tikvWriteTest = flag.Bool("tikv-write-test", false, "run TestIntegrationTest")

func TestIntegrationTest(t *testing.T) {
	if !*tikvWriteTest {
		t.Skip(`This is a manual test. You can use tiup playground and run this test. After the test is finished, find the SST files in the import directory of the TiKV node.`)
	}

	ctx := context.Background()
	pdAddrs := []string{"127.0.0.1:2379"}
	sortedKVs := make([][2][]byte, 1_000_000)
	for i := range sortedKVs {
		sortedKVs[i] = [2][]byte{
			[]byte("key" + fmt.Sprintf("%09d", i)),
			[]byte("1"),
		}
	}
	ts := uint64(404411537129996288)

	sstPath := "/tmp/go-write-cf.sst"
	now := time.Now()
	pebbleWriteSST(t, sstPath, sortedKVs, ts)
	t.Logf("write to SST takes %v", time.Since(now))

	now = time.Now()
	metas, err := write2ImportService4Test(ctx, pdAddrs, sortedKVs, ts)
	t.Logf("write to TiKV takes %v", time.Since(now))
	require.NoError(t, err)
	for _, meta := range metas {
		t.Logf("meta UUID: %v", uuid.UUID(meta.Uuid).String())
	}
}

func pebbleWriteSST(
	t *testing.T,
	path string,
	sortedKVs [][2][]byte,
	ts uint64,
) {
	writer, err := newWriteCFWriter(path, ts)
	require.NoError(t, err)

	for _, kv := range sortedKVs {
		err = writer.set(kv[0], kv[1])
		require.NoError(t, err)
	}

	err = writer.close()
	require.NoError(t, err)
}

func TestPebbleWriteSST(t *testing.T) {
	t.Skip("skip because need patched pebble")
	for i, c := range testCases {
		t.Logf("start test case %d", i)
		testPebbleWriteSST(t, c)
	}
}

func testPebbleWriteSST(
	t *testing.T,
	c *testCase,
) {
	sstPath := "/tmp/test-write.sst"
	pebbleWriteSST(t, sstPath, c.sortedKVs, c.ts)

	f, err := vfs.Default.Open(sstPath)
	require.NoError(t, err)
	readable, err := rockssst.NewSimpleReadable(f)
	require.NoError(t, err)
	reader, err := rockssst.NewReader(readable, rockssst.ReaderOptions{})
	require.NoError(t, err)
	defer reader.Close()

	goSSTKVs, goSSTProperties := getData2Compare(t, reader)
	require.Len(t, goSSTKVs, len(c.sortedKVs))

	f2, err := vfs.Default.Open(c.expectedFilePath)
	require.NoError(t, err)
	readable2, err := rockssst.NewSimpleReadable(f2)
	require.NoError(t, err)
	reader2, err := rockssst.NewReader(readable2, rockssst.ReaderOptions{})
	require.NoError(t, err)
	defer reader2.Close()

	tikvSSTKVs, tikvSSTProperties := getData2Compare(t, reader2)

	require.Equal(t, len(tikvSSTKVs), len(goSSTKVs))
	for i, kv := range goSSTKVs {
		require.Equal(t, kv[0], tikvSSTKVs[i][0], "key mismatch. index: %d", i)
		require.Equal(t, kv[1], tikvSSTKVs[i][1], "value mismatch. index: %d", i)
	}
	require.Equal(t, tikvSSTProperties, goSSTProperties)
}

func getData2Compare(
	t *testing.T,
	reader *rockssst.Reader,
) (kvs [][2][]byte, properties *rockssst.Properties) {
	iter, err := reader.NewIter(nil, nil)
	require.NoError(t, err)
	defer iter.Close()

	realKVs := make([][2][]byte, 0, 10240)

	k, v := iter.First()
	require.NotNil(t, k)
	getKey := func(k *rocks.InternalKey) []byte {
		return slices.Clone(k.UserKey)
	}
	getValue := func(v rocks.LazyValue) []byte {
		realV, callerOwned, err2 := v.Value(nil)
		require.NoError(t, err2)
		if !callerOwned {
			realV = slices.Clone(realV)
		}
		return realV
	}
	realKVs = append(realKVs, [2][]byte{getKey(k), getValue(v)})
	for {
		k, v = iter.Next()
		if k == nil {
			break
		}
		realKVs = append(realKVs, [2][]byte{getKey(k), getValue(v)})
	}

	//p := reader.Properties.Clone()
	//
	//// delete the identity properties
	//delete(p.UserProperties, "rocksdb.creating.db.identity")
	//delete(p.UserProperties, "rocksdb.creating.host.identity")
	//delete(p.UserProperties, "rocksdb.creating.session.identity")
	//delete(p.UserProperties, "rocksdb.original.file.number")
	//
	//// delete some mismatch properties because compress layer has different behaviour
	//p.DataSize = 0
	//p.NumDataBlocks = 0
	//p.IndexSize = 0
	//
	//// TODO(lance6716): check why it's different, can we tune bloomfilter to get the
	//// same behaviour?
	//p.FilterSize = 0
	//delete(p.UserProperties, "rocksdb.num.filter_entries")
	//
	//// TODO(lance6716): in integration tests we need to check
	//// rocksdb.tail.start.offset equals to rocksdb.data.size
	//delete(p.UserProperties, "rocksdb.tail.start.offset")
	//p.Loaded = nil

	return realKVs, nil
}

func TestDebugReadSST(t *testing.T) {
	t.Skip("this is a manual test")

	sstPath := "/tmp/test.sst"
	t.Logf("read sst: %s", sstPath)
	f, err := vfs.Default.Open(sstPath)
	require.NoError(t, err)
	readable, err := rockssst.NewSimpleReadable(f)
	require.NoError(t, err)
	reader, err := rockssst.NewReader(readable, rockssst.ReaderOptions{})
	require.NoError(t, err)
	defer reader.Close()

	t.Logf("properties:\n %s", reader.Properties.String())
	t.SkipNow()

	iter, err := reader.NewIter(nil, nil)
	require.NoError(t, err)
	defer iter.Close()

	k, v := iter.First()
	if k == nil {
		return
	}
	getValue := func(v rocks.LazyValue) []byte {
		realV, _, err2 := v.Value(nil)
		require.NoError(t, err2)
		return realV
	}
	t.Logf("key: %X\nvalue: %X", k.UserKey, getValue(v))
	for {
		k, v = iter.Next()
		if k == nil {
			break
		}
		t.Logf("key: %X\nvalue: %X", k.UserKey, getValue(v))
	}
}
